/*
 * Copyright (c) 2022 Huawei Technologies Co.,Ltd.
 *
 * DMS is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * drc.h
 *
 *
 * IDENTIFICATION
 *    src/drc/drc.h
 *
 * -------------------------------------------------------------------------
 */

#ifndef __DRC_H__
#define __DRC_H__

#include "cm_bilist.h"
#include "cm_hash.h"
#include "dms_cm.h"
#include "dms.h"
#include "mes_interface.h"
#include "cm_chan.h"
#include "cm_thread.h"
#include "cm_latch.h"
#include "cm_date.h"
#include "cm_list.h"
#include "drm.h"

#ifdef __cplusplus
extern "C" {
#endif

#define DRC_RES_CTX (&g_drc_res_ctx)
#define DRC_PART_MNGR (&g_drc_res_ctx.part_mngr)
#define DRC_PART_MASTER_ID(part_id) (g_drc_res_ctx.part_mngr.part_map[(part_id)].inst_id)
#define DRC_PART_OLD_MASTER_ID(part_id) (g_drc_res_ctx.part_mngr.old_part_map[(part_id)].inst_id)
#define DRC_DEFAULT_LOCK_RES_NUM (SIZE_M(1))
#define DRC_DEFAULT_GLOCK_RES_NUM (SIZE_K(100))
#define DRC_DEFAULT_LLOCK_RES_NUM (SIZE_K(100))
#define DRC_DEFAULT_ALOCK_RES_NUM (SIZE_K(10))
#define DRC_RES_NODE_OF(type, node, field) ((type *)((char *)(node) - OFFSET_OF(type, field)))
#define DRC_RES_EXTEND_MAX_NUM DMS_MAX_INSTANCES
#define DRC_SMON_QUEUE_SIZE 10000
#define DRC_BUF_RES_MAP (&g_drc_res_ctx.global_buf_res.res_map)
#define DRC_BUF_RES_POOL (&g_drc_res_ctx.global_buf_res.res_map.res_pool)
#define DRC_LOCK_RES_MAP (&g_drc_res_ctx.global_lock_res.res_map)
#define DRC_LOCK_RES_POOL (&g_drc_res_ctx.global_lock_res.res_map.res_pool)
#define DRC_LOCAL_RES_PART(partid) (&g_drc_res_ctx.local_res_parts[(partid)])
#define DRC_RECYCLE_THRESHOLD 0.9 /* hardcoded to 90% res pool usage */
#define DRC_RECYCLE_ALLOC_COUNT 1.1
#define DLS_LATCH_IS_OWNER(lock_mode) ((lock_mode) == DMS_LOCK_EXCLUSIVE || (lock_mode) == DMS_LOCK_SHARE)
#define DLS_LATCH_IS_LOCKED(stat) ((stat) == LATCH_STATUS_X || (stat) == LATCH_STATUS_IX || (stat) == LATCH_STATUS_S)
#define DRM_CTX (&g_drc_res_ctx.drm)
#define DRM_STAT (&g_drc_res_ctx.drm.stat)

typedef enum {
    DMS_RES_TYPE_IS_PAGE = 0,
    DMS_RES_TYPE_IS_LOCK = 1,
    DMS_RES_TYPE_IS_TXN = 2,
    DMS_RES_TYPE_IS_LOCAL_TXN = 3,
    DMS_RES_TYPE_IS_XA = 4,
    DMS_RES_TYPE_IS_ALOCK = 5,
} dms_res_type_e;

typedef struct st_drc_request_info {
    uint8   inst_id;            /* the instance that the request comes from */
    uint8   curr_mode;          /* current holding lock mode in request instance */
    uint8   req_mode;           /* the expected lock mode that request instance wants */
    uint8   is_try;             /* if is try request */
    uint8   intercept_type;
    uint8   is_upgrade;         /* used for table lock upgrade */
    uint16  sess_id;            /* the session id that the request comes from */
    uint64  ruid;               /* request packet ruid */
    uint32  srsn;
    date_t  req_time;
    uint32  req_proto_ver;
    dms_session_e sess_type;    /* session type */
} drc_request_info_t;

typedef struct st_drc_lock_item {
    bilist_node_t node;
    drc_request_info_t req_info;
} drc_lock_item_t;

typedef struct st_drc_cvt_item {
    uint64 begin_time;
    drc_request_info_t req_info;
} drc_cvt_item_t;

typedef struct st_drc_head {
    bilist_node_t       node;               // must be first!!!
    uint8               type;
    uint8               owner;
    uint8               lock_mode;
    bool8               is_using : 1;
    bool8               is_recycling : 1;
    bool8               unused : 6;
    uint16              len;
    uint16              part_id;
    uint64              copy_insts;
    bilist_node_t       part_node;
    drc_cvt_item_t      converting;
    bilist_t            convert_q;
    spinlock_t          lock;
    atomic32_t          ref_count;
} drc_head_t;

typedef struct st_drc_page {
    drc_head_t          head;
    char                data[DMS_PAGEID_SIZE];
    bilist_node_t       flush_node;
    uint64              owner_lsn;
    uint64              edp_map;
    uint64              last_edp_lsn;
    uint8               last_edp;
    uint8               rebuild_type;
    bool8               need_recover : 1;
    bool8               need_flush : 1;
    bool8               unused1 : 6;
    uint8               unused2;
    uint64              seq;
} drc_page_t;

typedef struct st_drc_lock {
    drc_head_t          head;
    dms_drid_t          lockid;
} drc_lock_t;

typedef struct st_drc_alock {
    drc_head_t          head;
    alockid_t           alockid;
} drc_alock_t;

typedef struct st_drc_xa {
    drc_head_t          head;
    drc_global_xid_t    xid;
} drc_xa_t;

#define DRC_DATA(drc)           ((char *)(drc) + sizeof(drc_head_t))

typedef struct st_drc_res_pool {
    spinlock_t  lock;
    bool32      inited;
    bilist_t    free_list;
    uint32      item_num;       // total
    uint32      item_hwm;       // has been alloc from buffer
    uint32      used_num;       // item_hwm - num in free list
    uint32      extend_step;
    uint32      max_extend_num;
    bool32      need_recycle;
    uint64      item_size;
    ptlist_t    addr_list;
} drc_res_pool_t;

int32 drc_res_pool_init(drc_res_pool_t *pool, uint32 max_extend_num, uint32 res_size, uint32 res_num);
void drc_res_pool_reinit(drc_res_pool_t *pool);
void drc_res_pool_destroy(drc_res_pool_t *pool);
char *drc_res_pool_alloc_item(drc_res_pool_t *pool);
void drc_res_pool_free_item(drc_res_pool_t *pool, char *res);

typedef bool32 (*res_cmp_callback)(char* res, const char* resid, uint32 len);
typedef uint32 (*res_hash_callback)(int32 res_type, char* resid, uint32 len);

typedef struct st_drc_res_bucket {
    spinlock_t  lock;
    uint32      bucket_version;
    bilist_t    bucket_list;
} drc_res_bucket_t;

typedef struct st_drc_res_map {
    bool32      inited;
    int32       res_type;
    uint32      bucket_num;
    uint32      bucket_version;
    drc_res_pool_t      res_pool;
    drc_res_bucket_t*   buckets;
    res_cmp_callback    res_cmp_func;
    res_hash_callback   res_hash_func;
} drc_res_map_t;

int32 drc_res_map_init(drc_res_map_t* res_map, uint32 max_extend_num, int32 res_type, uint32 item_num,
    uint32 item_size, res_cmp_callback res_cmp, res_hash_callback res_hash);
void drc_res_map_reinit(drc_res_map_t *res_map);
void drc_res_map_destroy(drc_res_map_t* res_map);
drc_res_bucket_t* drc_res_map_get_bucket(drc_res_map_t* res_map, char* resid, uint32 len);
void drc_res_map_add_res(drc_res_bucket_t* bucket, char* res);
char* drc_res_map_lookup(const drc_res_map_t* res_map, drc_res_bucket_t* res_bucket, char* resid, uint32 len);
void drc_res_map_del_res(drc_res_map_t* res_map, drc_res_bucket_t* bucket, char* resid, uint32 len);
void drc_destroy(void);

typedef struct st_drc_buf_res_msg {
    uint8 claimed_owner;
    uint8 mode;
    uint8 last_edp;
    uint16 len;
    char resid[DMS_RESID_SIZE];
    uint64 lsn;
    uint64 copy_insts;
    uint64 edp_map;
    drc_request_info_t converting;
} drc_buf_res_msg_t;

typedef struct st_drc_part_list {
    spinlock_t          lock;
    bilist_t            list;
} drc_part_list_t;

#define DRC_ACCESS_STAGE_ALL_INACCESS 0
#define DRC_ACCESS_STAGE_ALL_ACCESS 255
enum {
    PAGE_ACCESS_STAGE_ALL_INACCESS = DRC_ACCESS_STAGE_ALL_INACCESS,
    PAGE_ACCESS_STAGE_REALESE_ACCESS = 1,
    PAGE_ACCESS_STAGE_ALL_ACCESS = DRC_ACCESS_STAGE_ALL_ACCESS
};

enum {
    LOCK_ACCESS_STAGE_ALL_INACCESS = DRC_ACCESS_STAGE_ALL_INACCESS,
    LOCK_ACCESS_STAGE_NON_BIZ_SESSION_ACCESS = 1,
    LOCK_ACCESS_STAGE_ALL_ACCESS = DRC_ACCESS_STAGE_ALL_ACCESS
};

typedef struct st_drc_global_res_map {
    char aligned1[CM_CACHE_LINE_SIZE];
    latch_t res_latch;
    char aligned2[CM_CACHE_LINE_SIZE];
    volatile uint8 drc_accessible_stage;
    uint8 resvered[3]; // 3 for resvered
    drc_res_map_t res_map;
    drc_part_list_t res_parts[DRC_MAX_PART_NUM];
} drc_global_res_map_t;

typedef enum en_drc_mgrt_res_type {
    DRC_MGRT_RES_PAGE_TYPE,
    DRC_MGRT_RES_LOCK_TYPE,
    DRC_MGRT_RES_INVALID_TYPE,
} drc_mgrt_res_type_e;

typedef enum st_drc_part_status {
    PART_INIT,
    PART_NORMAL,
    PART_WAIT_MIGRATE,
    PART_MIGRATING,
    PART_WAIT_RECOVERY,
    PART_RECOVERING,
    PART_MIGRATE_COMPLETE,
    PART_MIGRATE_FAIL,
} drc_part_status_e;

typedef struct st_drc_part {
    uint8 inst_id;
    uint8 status;
    uint16 next;
} drc_part_t;

typedef struct st_drc_inst_part {
    uint16 first;
    uint16 last;
    uint16 count;
    uint16 expected_num;
} drc_inst_part_t;

typedef struct st_drc_part_mngr {
    uint32 version;
    bool8  inited;
    uint8  inst_num;
    uint8  remaster_status;
    uint8  remaster_inst;
    uint32 reversed;
    drc_part_t part_map[DRC_MAX_PART_NUM];
    drc_inst_part_t inst_part_tbl[DMS_MAX_INSTANCES];
    drc_part_t old_part_map[DRC_MAX_PART_NUM];
    drc_inst_part_t old_inst_part_tbl[DMS_MAX_INSTANCES];
} drc_part_mngr_t;

typedef struct st_drc_res_ctx {
    drc_res_pool_t          lock_item_pool;     /* enqueue item pool */
    drc_global_res_map_t    global_buf_res;     /* page resource map */
    drc_global_res_map_t    global_alock_res;
    drc_global_res_map_t    global_lock_res;
    drc_global_res_map_t    global_xa_res;      /* global xa resource map */
    drc_res_map_t           local_lock_res;
    drc_part_mngr_t         part_mngr;
    drc_res_map_t           txn_res_map;
    drc_res_map_t           local_txn_map;
    uint8                   deposit_map[DMS_MAX_INSTANCES];
    chan_t*                 chan;
    thread_t                smon_thread;
    uint32                  smon_sid;
    void*                   smon_handle;
    thread_t                smon_recycle_thread;
    uint32                  smon_recycle_sid;
    void*                   smon_recycle_handle;
    spinlock_t              smon_recycle_lock;
    bool32                  smon_recycle_pause;
    drm_t                   drm;
    bool32                  start_recycled;
    atomic_t                version;
    drc_part_list_t         local_res_parts[DRC_MAX_PART_NUM];
} drc_res_ctx_t;

extern drc_res_ctx_t g_drc_res_ctx;

typedef enum en_drc_req_owner_result_type {
    DRC_REQ_OWNER_GRANTED       = 0,
    DRC_REQ_OWNER_ALREADY_OWNER = 1,
    DRC_REQ_OWNER_CONVERTING    = 2,
    DRC_REQ_OWNER_WAITING       = 3,
} drc_req_owner_result_type_t;

typedef struct st_drc_req_owner_result {
    drc_req_owner_result_type_t type;
    uint8 curr_owner_id;
    uint64 invld_insts;  // share copies to be invalidated.
    uint64 seq; // message version
} drc_req_owner_result_t;

typedef struct st_cvt_info {
    union {
        struct {
            uint8   req_id;            /* the instance that the request comes from */
            uint8   curr_mode;          /* current holding lock mode in request instance */
            uint8   req_mode;           /* the expected lock mode that request instance wants */
            bool8   is_try;             /* if is try request */
            uint8   intercept_type;
            uint8   is_upgrade;         /* used for table lock upgrade */
            uint16  req_sid;            /* the session id that the request comes from */
            uint64  req_ruid;               /* request packet ruid */
            uint32  srsn;
            date_t  req_time;
            uint32  req_proto_ver;
            dms_session_e sess_type;    /* session type */
        };
        drc_request_info_t req_info;
    };

    uint64  invld_insts;
    uint64  seq;
    char    resid[DMS_RESID_SIZE];
    uint8   owner_id;
    uint8   res_type;
    uint16  len;
    drc_req_owner_result_type_t type;
} cvt_info_t;

typedef struct st_claim_info {
    uint8   new_id;
    uint8   old_id;
    bool8   has_edp;
    uint8   res_type;
    uint32  len;
    uint64  lsn;
    uint32  sess_id;
    dms_lock_mode_t req_mode;
    char    resid[DMS_RESID_SIZE];
    dms_session_e sess_type;
    uint32  srsn;
} claim_info_t;

typedef struct st_res_id {
    char    data[DMS_RESID_SIZE];
    uint16  len;
    uint8   type;
    uint8   unused;
} res_id_t;

static inline void init_drc_cvt_item(drc_cvt_item_t* converting)
{
    converting->begin_time = 0;
    converting->req_info.inst_id = CM_INVALID_ID8;
    converting->req_info.sess_id = CM_INVALID_ID16;
    converting->req_info.ruid = CM_INVALID_ID64;
    converting->req_info.curr_mode = DMS_LOCK_NULL;
    converting->req_info.req_mode = DMS_LOCK_NULL;
    converting->req_info.is_try = 0;
    converting->req_info.is_upgrade = 0;
}

static inline drc_global_res_map_t *drc_get_global_res_map(drc_res_type_e res_type)
{
    switch (res_type) {
        case DRC_RES_PAGE_TYPE:
            return &g_drc_res_ctx.global_buf_res;
        case DRC_RES_LOCK_TYPE:
            return &g_drc_res_ctx.global_lock_res;
        case DRC_RES_GLOBAL_XA_TYPE:
            return &g_drc_res_ctx.global_xa_res;
        case DRC_RES_ALOCK_TYPE:
            return &g_drc_res_ctx.global_alock_res;
        default:
            cm_panic(0);
            return NULL;
    }
}

/* page resource related API */
uint8 drc_get_deposit_id(uint8 instance_id);
void drc_get_convert_info(drc_head_t *drc, cvt_info_t *cvt_info);

static inline void bitmap64_set(uint64 *bitmap, uint8 num)
{
    uint64 tmp;
    CM_ASSERT(num < DMS_MAX_INSTANCES);
    tmp = (uint64)1 << num;
    *bitmap |= tmp;
}

static inline void bitmap64_clear(uint64 *bitmap, uint8 num)
{
    uint64 tmp;
    CM_ASSERT(num < DMS_MAX_INSTANCES);
    tmp = ~((uint64)1 << num);
    *bitmap &= tmp;
}

static inline bool32 bitmap64_exist(const uint64 *bitmap, uint8 num)
{
    uint64 tmp;
    CM_ASSERT(num < DMS_MAX_INSTANCES);
    tmp = (uint64)1 << num;
    tmp = (*bitmap) & tmp;
    return (tmp == 0) ? CM_FALSE : CM_TRUE;
}

static inline uint8 bitmap64_get_bit_is_one(uint64 bitmap)
{
    uint8 bit = 0;
    while (bitmap != 0) {
        if (bitmap & (uint64)1) {
            return bit;
        }
        bitmap = bitmap >> (uint64)1;
        ++bit;
    }
    return 0;
}

static inline uint64 bitmap64_create(const uint8 *inst_id, uint8 inst_count)
{
    uint64 inst_map = 0;
    for (uint8 i = 0; i < inst_count; i++) {
        inst_map |= ((uint64)1 << inst_id[i]);
    }
    return inst_map;
}

static inline void bitmap64_minus(uint64 *bitmap1, uint64 bitmap2)
{
    uint64 bitmap = (*bitmap1) & (~bitmap2);
    *bitmap1 = bitmap;
}

static inline void bitmap64_union(uint64 *bitmap1, uint64 bitmap2)
{
    uint64 bitmap = (*bitmap1) | bitmap2;
    *bitmap1 = bitmap;
}

int32 drc_get_master_id(char *resid, uint8 type, uint8 *master_id);
int32 drc_get_old_master_id(char *resid, uint8 type, uint8 *master_id);

#define DRC_DISPLAY(drc, desc)                                                                                      \
do {                                                                                                                \
    drc_page_t *_drc_page_ = (drc_page_t *)(drc);                                                                   \
    drc_request_info_t *cvt = &(drc)->converting.req_info;                                                          \
    if ((drc)->type == DRC_RES_PAGE_TYPE) {                                                                         \
        LOG_DEBUG_INF("[%s][%s]%d-%d-%llu, CVT:%d-%d-%d-%d-%d-%llu-%d, EDP:%d-%llu-%llu, FLAG:%d-%d-%d",            \
            desc, cm_display_resid(DRC_DATA(drc), (drc)->type), (drc)->owner, (drc)->lock_mode, (drc)->copy_insts,  \
            cvt->inst_id, cvt->curr_mode, cvt->req_mode, cvt->is_try, cvt->sess_type, cvt->ruid, cvt->sess_id,      \
            _drc_page_->last_edp, _drc_page_->last_edp_lsn, _drc_page_->edp_map,                                    \
            _drc_page_->need_recover, _drc_page_->need_flush, _drc_page_->rebuild_type);                            \
    } else if ((drc)->type == DRC_RES_LOCK_TYPE || (drc)->type == DRC_RES_ALOCK_TYPE ||                             \
               (drc)->type == DRC_RES_GLOBAL_XA_TYPE) {                                                             \
        LOG_DEBUG_INF("[%s][%s]%d-%d-%llu, CVT:%d-%d-%d-%d-%d-%llu-%d",                                             \
            desc, cm_display_resid(DRC_DATA(drc), (drc)->type), (drc)->owner, (drc)->lock_mode, (drc)->copy_insts,  \
            cvt->inst_id, cvt->curr_mode, cvt->req_mode, cvt->is_try, cvt->sess_type, cvt->ruid, cvt->sess_id);     \
    } else {                                                                                                        \
        LOG_DEBUG_INF("invalid drc type: %d", (drc)->type);                                                         \
    }                                                                                                               \
} while (0)

#ifdef __cplusplus
}
#endif
#endif // __DRC_H__